Dabai的个人博客

Atomix源码分析:Raft Client的实现和与Server的交互

Atomix是一个基于Raft协议的分布式协同框架,是ONOS分布式集群和分布式原语实现的基础,Atomix提供集群的管理,通信,领导选择,Key-Value存储,以及数据分片等功能。从Atomix 2.0开始,raft协议的实现Copycat已经集成到Atomix了,下面将结合Atomix 2.0对Atomix源码进行分析,从而深入理解Raft协议的原理和Atomix的工作机制。(更新中~)

Atomix里的Raft实现主要包括raft client和raft server,raft server就是Raft里面实现数据一致性同步的节点,Raft里面的Leader,Candidate,Follower身份的确立和转换都是围绕raft server展开的,raft client通过raft session 与Raft状态机集群交互,从而完成状态机的查询,修改等操作。

注:Atomix 2.x在tests模块下新增了一个Raft测试文件RaftFuzzTest.java。这个文件包含了完整的raft client和raft server的建立过程,阅读这个文件的源码对了解和学习Atomix很有帮助!

1. Raft client的实现

Raft client的实现主要包括RaftClient,RaftProxy,RaftSession,包括Client与Server连接的建立,会话的管理,以及提交操作等功能。

1.1 创建RaftClient

RaftClient提供了与Raft集群建立连接的接口,DefaultRaftClientRaftClient的实现类,可以使用构造器创建RaftClient,在Atomix工程里面的RaftFuzzTest里有创建RaftClient的例子:

1
2
3
4
5
6
RaftClient client = RaftClient.newBuilder()
.withMemberId(memberId) //1
.withProtocol(protocol) //2
.build();

client.connect(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).join();

其中,方法1设置该RaftClient的ID,一般用本机的IP地址表示,而方法2设置client与server的通信协议,该协议对象需要实现RaftClientProtocol接口。

创建好RaftClient后,使用RaftClient.connect与Raft cluster建立连接:

1
2
3
4
5
6
7
8
9
10
/**
* Connects the client to Raft cluster via the provided server addresses.
* <p>
* The client will connect to servers in the cluster according to the pattern specified by the configured
* {@link CommunicationStrategy}.
*
* @param members A set of server addresses to which to connect.
* @return A completable future to be completed once the client is registered.
*/
CompletableFuture<RaftClient> connect(Collection<MemberId> members);

由上面的注释可知该该raft client与raft cluster的哪一个成员建立连接,是根据CommunicationStrategy类进行配置的,而配置这CommunicationStrategyRaftProxy来完成的。

PS: 注意这里虽然说是建立连接,但也只是做一些建立连接的初始化工作,正真的建立(如发送建立会话的请求)在RaftProxy创建完成后进行的。

1.2 创建RaftProxy

RaftProxy提供了提交特定操作到raft cluster状态机的接口,主要包括invoke方法和addEventListener方法,RaftFuzzTest中给了RaftProxy的创建示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Creates a test session.
*/
private RaftProxy createProxy(RaftClient client, ReadConsistency consistency) {
return client.newProxyBuilder() //1
.withName("test") //2
.withServiceType("test")
.withReadConsistency(consistency) //3
.withCommunicationStrategy(COMMUNICATION_STRATEGY) //4
.build() //5
.open()
.join();
}

上述代码方法1返回一个新的RaftProxy.Builder从而开始建立会话连接;方法2是设置该会话的名称;方法3设置状态机查询的一致性等级,有线性一致性,顺序一致性等,Raft协议中线性一致性的查询最终必须由Leader来处理;而方法4则设置该会话与Raft cluster的通信策略,可以指定该会话直接和Leader建立,也可让该会话随机和cluster中的某一个节点建立。更多的细节原理可以阅读ReadConsistencyCommunicationStrategy的文档注释。上面的代码将为一个raft client创建一个会话连接,可以重复上述步骤为该client建立多个到raft cluster的会话连接。需要注意的是,方法1中返回的RaftProxy.Builder是在DefaultRaftClient内实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Default Raft session builder.
*/
private class SessionBuilder extends RaftProxy.Builder {
@Override
public RaftProxy build() {
// Create a client builder that uses the session manager to open a session.
RaftProxyClient.Builder clientBuilder = new RaftProxyClient.Builder() {
@Override
public CompletableFuture<RaftProxyClient> buildAsync() {
return sessionManager.openSession(name, serviceType, readConsistency, communicationStrategy, timeout);
}
}
....
}

通过注释可知,该Builder会建立一个会话连接,这是通过该Builder实现buildAsync方法并在buildAsync内部调用RaftProxyManageropenSession方法实现的,buildAsync最终由方法5调用。

1.3 创建会话连接

Raft client和server所有会话的创建和管理由RaftProxyManager类实现。RaftProxyManager的构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public RaftProxyManager(String clientId, MemberId memberId, RaftClientProtocol protocol, MemberSelectorManager selectorManager, ScheduledExecutorService threadPoolExecutor) {
this.clientId = checkNotNull(clientId, "clientId cannot be null");
this.memberId = checkNotNull(memberId, "memberId cannot be null");
this.protocol = checkNotNull(protocol, "protocol cannot be null");
this.selectorManager = checkNotNull(selectorManager, "selectorManager cannot be null");
this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftClient.class)
.addValue(clientId)
.build());

this.connection = new RaftProxyConnection(
protocol,
selectorManager.createSelector(CommunicationStrategy.ANY),
new ThreadPoolContext(threadPoolExecutor),
LoggerContext.builder(RaftClient.class)
.addValue(clientId)
.build());
this.threadPoolExecutor = checkNotNull(threadPoolExecutor, "threadPoolExecutor cannot be null");

}

注意这里面构造了一个RaftProxyConnection对象,建立会话的open-session消息以及保持该会话连接的keep-alive消息的发送和接收都是由该对象完成的。传入的参数selectorManager.createSelector(CommunicationStrategy.ANY表示该connection是随机和cluster中的某一个成员建立的,后面的keep-alive请求也都会持续发往该成员节点并由该节点处理。笔者阅读了Raft server实现的相关代码,发现raft follower角色的成员也不直接处理open-session或keep-alive请求,而是将这个请求forward给leader处理,具体的逻辑可以参考RaftRole接口及其类的实现。需要注意的,client session信息都是以复制状态机的形式保存的,也就是说raft cluster中的每一个节点都保存有该session的信息,另外当leader处理一个open-session或keep-alive请求时,也会先更新follower的状态机信息(主要是该会话的时间戳),然后提交该更新到本地状态机。

RaftProxyManager类里的openSession方法调用RaftProxyConnection类的openSession方法发送一个open-session请求,返回的response结果中包含该session的sessionId,timeout等信息,然后使用keepAliveSessions方法发送该会话的keep-alive消息。

RaftProxyManager类里面,keep-alive的发送和调度分别是由keepAliveSessions(long timeout)scheduleKeepAlive(long timeout, long delta)方法完成的。注意这个timeout的值是根据OpenSessionResponse获得的,在LeaderRole中可以看到OpenSessionResponse的构造逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public CompletableFuture<OpenSessionResponse> onOpenSession(OpenSessionRequest request) {
final long term = raft.getTerm();
final long timestamp = System.currentTimeMillis();

// If the client submitted a session timeout, use the client's timeout, otherwise use the configured
// default server session timeout.
final long timeout;
if (request.timeout() != 0) {
timeout = request.timeout();
} else {
timeout = raft.getSessionTimeout().toMillis();
}
......

在使用RaftProxy.Builder构造RaftProxy时可以设置该session的timeout,如果没有设定,timeout默认为0,那么Leader也将通过request-response返回一个timeout,这个timeout在RaftContext类中被默认设置为5000ms。

另外scheduleKeepAlive(long timeout, long delta)方法中的delta参数表示从发送一个keep-alive请求到接收到该请求响应的时间间隔,keep-alive循环调度的方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 /**
* Schedules a keep-alive request.
*/
private synchronized void scheduleKeepAlive(long timeout, long delta) {
ScheduledFuture<?> keepAliveFuture = keepAliveFutures.remove(timeout);
if (keepAliveFuture != null) {
keepAliveFuture.cancel(false);
}

// Schedule the keep alive for 3/4 the timeout minus the delta from the last keep-alive request.
keepAliveFutures.put(timeout, threadPoolExecutor.schedule(() -> {
if (open.get()) {
keepAliveSessions(timeout);
}
}, Math.max(Math.max((long)(timeout * .75) - delta, timeout - 2500 - delta), 0), TimeUnit.MILLISECONDS));
}

上面的delta表示从发送keep-alive请求到获得该请求响应的时间间隔,一般该时间很短(2~4ms),因此若timeout设置成6000ms,那么keep-alive消息的发送间隔为4500ms左右。

1.4 Raft状态机保存并更新会话信息

Raft Client与Raft Server的会话信息保存在Raft状态机中,也是就是说各个节点保存有与整个Raft cluster建立的所有的会话信息,并且Raft状态机会根据KeepAliveRequestKeepAliveResponse消息来不断更新状态机,从而判断各个会话是否存活。

Raft状态机更新通过类RaftServiceManager实现,该类里面的applyKeepAlive方法就是根据KeepAliveRequest请求来更新Raft状态机。通过这部分代码发现,Raft状态机总是以KeepAliveEntry的时间戳来更新当前会话的最新时间,而KeepAliveEntry是由Leader产生的,因此即使各个节点的物理时间不同步,但各个节点会话的保存的所有会话的更新时间都是一样的。由于更新Raft状态机是通过写日志的方式进行的,Atomix处理KeepAliveRequest请求比较消耗系统资源。

另外需要注意的是,若Leader节点发生失效后选举产生了新的Leader节点,Raft状态机会重置各个会话的Timeout计时器,从而避免选举时间过长导致判定Raft会话失效。这部分代码在RaftServiceManager类里面的applyInitialize方法中实现:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Applies an initialize entry.
* <p>
* Initialize entries are used only at the beginning of a new leader's term to force the commitment of entries from
* prior terms, therefore no logic needs to take place.
*/
private CompletableFuture<Void> applyInitialize(Indexed<InitializeEntry> entry) {
for (DefaultServiceContext service : services.values()) {
service.keepAliveSessions(entry.index(), entry.entry().timestamp());
}
return CompletableFuture.completedFuture(null);
}

2. Raft client和raft server的交互

2.1 Raft client和raft server间消息的发送和回传

由于Atomix中使用了很多函数式编程以及回调处理,很多的代码调用关系不是很直观,下面将以RaftFuzzTest的实现为例介绍Keep-Alive消息的发送和处理流程。

  1. 首先需要了解的是RaftFuzzTest中使用的RaftClientProtocolRaftServerProtocol都是基于MessagingService消息处理接口实现的。该消息处理接口会区分每一种消息的类型,同时也需要给不同的消息类型注册不同的处理方法,当收到一种特定类型的消息时,就会回调对应的处理方法。

  2. 如前面所述,将使用RaftProxyConnection类的keepAlive方法开始发送keep-alive请求:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    /**
    * Sends a keep alive request to the given node.
    *
    * @param request the request to send
    * @return a future to be completed with the response
    */
    public CompletableFuture<KeepAliveResponse> keepAlive(KeepAliveRequest request) {
    CompletableFuture<KeepAliveResponse> future = new CompletableFuture<>();
    if (context.isCurrentContext()) {
    sendRequest(request, protocol::keepAlive, next(), future);
    } else {
    context.execute(() -> sendRequest(request, protocol::keepAlive, next(), future));
    }
    return future;
    }

    这里的protocol是一个实现了RaftClientProtocol接口对象,负责消息的发送,该对象在创建RaftClient时注册进来的。而next()则返回接收该request请求的目的成员ID,然后sendRequest将调用RaftClientProtocolkeepAlive方法发送该keep-alive请求,RaftFuzzTest中使用的RaftClientProtocol接口是在RaftClientMessagingProtocol类中实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Override
    public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) {
    return sendAndReceive(memberId, "keep-alive", request);
    }

    protected <T, U> CompletableFuture<U> sendAndReceive(MemberId memberId, String type, T request) {
    Endpoint endpoint = endpoint(memberId);
    if (endpoint == null) {
    return Futures.exceptionalFuture(new ConnectException());
    }
    return messagingService.sendAndReceive(endpoint, type, serializer.encode(request))
    .thenApply(serializer::decode);
    }
  3. 虽然上面的回调直接返回了处理后的结果,那么这个请求在实际中是怎么处理和返回的呢?事实上,上面的sendAndReceive(memberId, "keep-alive", request)方法使用了一个很重要的参数,即这个请求的消息类型是:”keep-alive”,而raft server节点会根据请求的类型调用对应的处理方法。RaftServerProtocol接口中有一个注册处理keep-alive消息的handler的方法:

    1
    2
    3
    4
    5
    6
    /**
    * Registers a keep alive request callback.
    *
    * @param handler the open session request handler to register
    */
    void registerKeepAliveHandler(Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> handler);

    RaftFuzzTest使用的RaftServerMessagingProtocol是一个实现了RaftServerProtocol接口的类,在这个类里面,注册处理keep-alive消息的Handler的方法实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Override
    public void registerKeepAliveHandler(Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> handler) {
    registerHandler("keep-alive", handler);
    }
    protected <T, U> void registerHandler(String type, Function<T, CompletableFuture<U>> handler) {
    messagingService.registerHandler(type, (e, p) -> {
    CompletableFuture<byte[]> future = new CompletableFuture<>();
    handler.apply(serializer.decode(p)).whenComplete((result, error) -> { //1
    if (error == null) {
    future.complete(serializer.encode(result)); //2
    } else {
    future.completeExceptionally(error);
    }
    });
    return future; //3
    });
    }

    由此可以看出,一个特定类型消息的处理方法最终会注册到messagingService里面,方法1调用了Handler处理方法的,而方法2则等待处理结果的返回,方法3则返回一个异步执行的结构。因此当MessagingService收到一个特定类型的消息时,就会调用已注册的对应该消息的处理方法,从而完成消息的处理,MessagingService会回传该Handler返回的结果。

  4. 上面处理keep-alive请求的Handler的注册的过程是在RaftContext类里实现的,这也是一个非常重要的类,前面也有提及,raft状态机状态的管理就是通过这个类实现的。注册Handler的部分代码如下所示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
     /**
    * Registers server handlers on the configured protocol.
    */
    private void registerHandlers(RaftServerProtocol protocol) {
    protocol.registerOpenSessionHandler(request -> runOnContext(() -> role.onOpenSession(request)));
    protocol.registerCloseSessionHandler(request -> runOnContext(() -> role.onCloseSession(request)));
    protocol.registerKeepAliveHandler(request -> runOnContext(() -> role.onKeepAlive(request)));
    ......
    }

    因此,当raft server收到一个“keep-alive”类型的请求时,就会调用role.onKeepAlive(request))方法,并返回一个处理后的结果,即返回一个CompletableFuture<KeepAliveResponse>>类型的值。

    由上面的分析可知,Atomix里面消息的处理流程的核心思想就是注册和回调,同时使用CompletableFuture作为异步计算的结果,提高程序的运行效率。

2.2 Client对状态机的操作

Client对状态机的操作包括写和读,分别用Command和Query来表示。Command操作会使状态机发生转移,需要通过leader写log并经过多数派的提交确认;Query操作可以根据一致性等级进行操作,若需实现线性一致性,需要leader的处理。

2.2.1 Command操作

参考: